package org.adam2.sariel.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * MapReduce的工具类
 */
public class MapReduceUtil {

    /**
     * 解析value字符串，以\t作为分隔符，将每一个列值存储再List中，并返回
     * @param key
     * @param value
     * @return
     */
    public static List<String> getColumnList(LongWritable key, Text value) {
        // 偏移量为0时，表示是文件的第一行，这行是字段名称，需要忽略
        if (key.get() == 0) {
            return null;
        }

        String str = null;
        try {
            // 注意：此处千万不能使用value.toString().getBytes()，否则中文字符串中会有乱码
            str = new String(value.getBytes(), "gbk");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        String[] array=str.split("\t");
        return Arrays.asList(array);
    }

    /**
     * 这个方法已经被废弃了。
     * 解析value字符串，以\t作为分隔符，将每一个列值存储再List中，并返回
     * @param key
     * @param value
     * @return
     */
    public static List<String> getColumnList2(LongWritable key, Text value){
        // 偏移量为0时，表示是文件的第一行，这行是字段名称，需要忽略
        if(key.get() == 0){
            return null;
        }

        String str = null;
        try {
            // 注意：此处千万不能使用value.toString().getBytes()，否则存入hbase后是16进制的
            str = new String(value.getBytes(), "gbk");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        // 用于保存每个单元格的值
        StringBuffer cell=new StringBuffer();
        // 用于保存一行中的所有单元格
        List<String> columnList=new ArrayList<String>();
        int tNum=0;
        for (int i = 0; i < str.length(); i++){
            char c = str.charAt(i);
            // 以\t作为每个单元格的分隔符。但是有时\t不止一个。有的行第一个字符为3，这个3应该忽略。
            // 如果连续出现4个\t，则表示这一列是空，所以需要添加一个空字符串
            if(c!='\t' && i!=0) { // 拼这个单元格中的字符
                cell.append(c);
            }else if(c=='\t' && !cell.toString().equals("")){ // 表示这个单元格中的字符串拼接完了
                columnList.add(cell.toString());
                tNum=1;
                cell=new StringBuffer();
            }else if(c=='\t' && tNum>2){ // 表示这个单元格是空
                columnList.add("");
                tNum=0;
                cell=new StringBuffer();
            }else {
                tNum++;
            }
        }
        if(columnList.get(18).contains(".")){
            columnList.add(columnList.get(columnList.size()-6));
            columnList.set(columnList.size()-7, columnList.get(columnList.size()-8));
            columnList.set(columnList.size()-8, columnList.get(columnList.size()-9));
            columnList.set(columnList.size()-9, "");
//            System.out.println(columnList.get(0));
//            System.out.println(columnList.get(columnList.size()-6));
//            System.out.println(columnList.get(columnList.size()-7));
//            System.out.println(columnList.get(columnList.size()-8));
//            System.out.println(columnList.get(columnList.size()-9));
        }

        return columnList;
    }

    /**
     * 删除hdfs的路径，如：hdfs://centos:9000/output
     * @param conf
     * @param hdfsUrl hdfs的URL，如：hdfs://centos:9000/
     * @param outputPath hdfs的路径，如：output
     */
    public static void deleteOutput(Configuration conf, String hdfsUrl, String outputPath){
        Path path = new Path(outputPath);
        FileSystem fs;
        try {
            fs = FileSystem.get(URI.create(hdfsUrl), conf);
            fs.deleteOnExit(path);
            System.out.println("Delete: " + outputPath);
            fs.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 因为txt文件的空值使用--来替代，因此当遇到--时直接返回null。如果不是空值，则根据所需要的数据类型进行转换。
     * 并且需要将科学计数法表示的数字转换为普通数字。
     * @param cell
     * @param clazz
     * @return
     */
    public static Object filterEmptyCell(String cell, Class clazz){
        if(clazz.getName().equals("java.lang.Double")){
            if(cell.equals("--")){
                return null;
            }else{
                return Double.parseDouble(cell);
            }
        }

        if(clazz.getName().equals("java.math.BigInteger")){
            if(cell.equals("--")){
                return null;
            }else{
                // 如果是科学计数法，则将其转换为普通数字
                int index=cell.indexOf("E");
                double num;
                if(index!=-1){
                    num=Double.parseDouble(cell.substring(0,index));
                    int i=Integer.parseInt(cell.substring(index+2, cell.length()));
                    for(int j=0;j<i;j++){
                        num*=10;
                    }
                    System.out.println(num);
                    return BigDecimal.valueOf(num).toBigInteger();
                }else{
                    return BigInteger.valueOf(Long.parseLong(cell));
                }
            }
        }

        if(clazz.getName().equals("java.lang.Long")){
            if(cell.equals("--")){
                return null;
            }else{
                return Long.parseLong(cell);
            }
        }

        if(clazz.getName().equals("java.lang.Integer")){
            if(cell.equals("--")){
                return null;
            }else{
                return Integer.parseInt(cell);
            }
        }

        return null;
    }

    public static String antiFilterEmptyCell(Object obj){
        if(null==obj){
            return "--";
        }else{
            return obj.toString();
        }
    }
}
